package com.mai.realtime.app.dwd.db;

import com.mai.realtime.app.BaseSqlApp;
import com.mai.realtime.common.Constant;
import com.mai.realtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * com.atguigu.realtime.app.dwd.db.Dwd_CourseReviewInfo
 * 课程评价事实表
 */
public class Dwd_CourseReviewInfo extends BaseSqlApp {
    public static void main(String[] args) {
        new Dwd_CourseReviewInfo().init(
                3001,
                3,
                "Dwd_CourseReviewInfo"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {
        //读取ods_db
        readOdsDb(tEnv,"Dwd_CourseReviewInfo");

        // 从动态表读取课程评价表
        Table result = tEnv.sqlQuery("select " +
                " data['id'] id ," +
                " data['user_id'] user_id ," +
                " data['course_id'] course_id ," +
                " data['review_txt'] review_txt ," +
                " data['review_stars'] review_stars ," +
                " data['create_time'] create_time ," +
                " data['deleted'] deleted ," +
                " ts ts" +
                " from ods_db" +
                "where `database`='gmall' " +
                " and `table`='review_info' " +
                " and `type`='insert'");

        tEnv.createTemporaryView("reviewInfo",result);

        // 定义动态表输出topic关联
        tEnv.executeSql("create table dwd_course_review_info(" +
                " id string," +
                " user_id string," +
                " course_id string," +
                " review_txt string," +
                " review_stars string," +
                " create_time string," +
                " deleted string," +
                " ts BIGINT" +
                ")"+ SQLUtil.getKafkaSink(Constant.TOPIC_DWD_COURSE_REVIEW_INFO)
        );

        // 将结果输出DWD
        result
                // .execute().print();
                .executeInsert("dwd_course_review_info");

    }
}
